// Copyright 2014 The mqrouter Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package pulsar

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"gitee.com/xfrm/middleware/xtime"

	"gitee.com/xfrm/middleware/xmq/pub"

	"github.com/apache/pulsar-client-go/pulsar"

	"gitee.com/xfrm/middleware/xtrace"

	. "gitee.com/xfrm/middleware/internal/middlewareconf"
)

type MessageID interface {
	pulsar.MessageID
}

type ProducerMessage struct {
	*pulsar.ProducerMessage
}

// Writer pulsar producer
type Producer struct {
	pulsar.Producer
	client pulsar.Client
	config *ProducerConfig
	url    string
}

// NewProducer return a pulsar producer
func NewProducer(ctx context.Context, topic string) (*Producer, error) {
	middlewareConfig, err := GetMiddlewareConfig()
	if err != nil {
		return nil, err
	}
	config, err := middlewareConfig.GetMQConfig(ctx, topic, MiddlewareTypePulsar)
	if err != nil {
		return nil, err
	}

	topic = pub.WrapTopicFromContext(ctx, topic)
	producerConfig := &ProducerConfig{
		ProducerOptions: &pulsar.ProducerOptions{
			Topic:                   topic,
			BatchingMaxMessages:     uint(config.BatchSize),
			BatchingMaxPublishDelay: config.BatchTimeout,
			BatchingMaxSize:         config.BatchKBSize,
			Name:                    getName(),
			SendTimeout:             config.SendTimeout,
		},
		Broker:           strings.Join(config.MQAddr, ApolloMQBrokersSep),
		OperationTimeout: config.TimeOut,
	}

	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:              producerConfig.Broker,
		OperationTimeout: producerConfig.OperationTimeout,
	})
	if err != nil {
		return nil, fmt.Errorf("Could not instantiate Pulsar client: %v", err)
	}

	producer, err := client.CreateProducer(*producerConfig.ProducerOptions)
	if err != nil {
		client.Close()
		return nil, err
	}
	return &Producer{
		Producer: producer,
		client:   client,
		url:      strings.Join(config.MQAddr, ApolloMQBrokersSep),
		config:   producerConfig,
	}, nil
}

func NewProducerWithConfig(ctx context.Context, config *ProducerConfig) (*Producer, error) {
	fun := "NewProducerWithConfig -->"

	if err := validateProducerConfig(config); err != nil {
		return nil, err
	}
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               config.Broker,
		OperationTimeout:  config.OperationTimeout,
		ConnectionTimeout: config.ConnectionTimeout,
	})
	if err != nil {
		return nil, fmt.Errorf("%s new pulsar client, err: %v", fun, err)
	}

	config.Topic = pub.WrapTopicFromContext(ctx, config.Topic)
	producer, err := client.CreateProducer(*config.ProducerOptions)
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("%s create producer, err: %v", fun, err)
	}

	return &Producer{
		Producer: producer,
		client:   client,
		url:      config.Broker,
		config:   config,
	}, nil
}

func (p *Producer) setSpanTags(span xtrace.Span) {
	span.SetTag(xtrace.TagComponent, pub.TraceComponent)
	span.SetTag(xtrace.TagPalfishMessageBusType, pub.TraceMessageBusTypePulsar)
	span.SetTag(xtrace.TagSpanKind, xtrace.SpanKindProducer)
	span.SetTag(xtrace.TagMessageBusDestination, p.config.Topic)
	span.SetTag(xtrace.TagMessagingSystem, xtrace.MessagingSystemPulsar)
	span.SetTag(xtrace.TagMessagingDestinationKind, xtrace.MessagingDestinationKindTopic)
	span.SetTag(xtrace.TagMessagingDestination, p.config.Topic)
	span.SetTag(xtrace.TagNetPeerIP, p.url)
}

// WriteMsg ...
func (p *Producer) WriteMsg(ctx context.Context, k string, v interface{}) (MessageID, error) {
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setSpanTags(span)
	}

	msg, err := json.Marshal(v)
	if err != nil {
		return nil, err
	}

	st := xtime.NewTimeStat()
	messageID, err := p.Producer.Send(ctx, &pulsar.ProducerMessage{
		Payload:   msg,
		Key:       k,
		EventTime: time.Now(),
	})

	pub.StatReqDuration(ctx, p.Producer.Topic(), "Producer.WriteMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())

	return messageID, err
}

// WriteAsyncMsg ...
func (p *Producer) WriteAsyncMsg(ctx context.Context, k string, v interface{}, callback func(MessageID, *ProducerMessage, error)) error {
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		p.setSpanTags(span)
	}

	msg, err := json.Marshal(v)
	if err != nil {
		return err
	}
	st := xtime.NewTimeStat()
	p.Producer.SendAsync(ctx, &pulsar.ProducerMessage{
		Payload:   msg,
		Key:       k,
		EventTime: time.Now(),
	}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
		callback(id, &ProducerMessage{message}, err)
	})
	pub.StatReqDuration(ctx, p.Producer.Topic(), "Producer.WriteAsyncMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())

	return err
}

func (p *Producer) Close() error {
	p.Producer.Close()
	p.client.Close()

	return nil
}

func validateProducerConfig(config *ProducerConfig) error {
	if config == nil {
		return ErrConfigNotNull
	}
	if config.Topic == "" {
		return ErrTopicInfoNotNull
	}
	if config.Broker == "" {
		return ErrBrokerNotNull
	}
	return nil
}
